<?php

use \PhpAmqpLib\Exception\AMQPTimeoutException;

/**
 * 订阅消息
 * @Author: Helloweba
 * @Date:   2020-01-01 20:24:57
 * @Last Modified by:   Helloweba
 * @Last Modified time: 2020-01-05 20:17:16
 */

require_once __DIR__ . '/../vendor/autoload.php';
require_once __DIR__ . '/../config.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$config = getConfig();
$exchange = 'article';

$routerKey = 'fantasy'; //只消费玄幻类消息

//$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$connection = new AMQPStreamConnection(
    $config['host'],
    $config['port'],
    $config['user'],
    $config['password']
);

$channel = $connection->channel();

$channel->exchange_declare($exchange, 'direct', false, false, false);

// durable 持久化
// 可以验证，若队列不是持久化的，或发送的消息未设置持久化的属性，在RabbitMQ重启后，消息都会丢失。

// exclusive 排他性
// 对于排他队列，只有创建它的连接有权访问，连接断开后，排他队列将自动删除。这种队列适用于一个队列仅对应一个客户端收发消息的场景。
list($queueName, ,) = $channel->queue_declare("fantasy", false, false, false, true);

// echo '$queueName:' . $queueName . PHP_EOL;

$channel->queue_bind("fantasy", $exchange, $routerKey);

echo " [*] Waiting for messages. To exit press CTRL+C" . PHP_EOL;

$callback = function ($msg) {
    // print_r($msg);
    echo ' Received message：', $msg->delivery_info['routing_key'], ':', $msg->body, PHP_EOL;
    // file_put_contents('demo.txt', $msg->delivery_info['routing_key'], FILE_APPEND);
    // sleep(1);  //模拟耗时执行
};

$channel->basic_consume($queueName, '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
    try {
        $channel->wait(null, false, 10);
    } catch (AMQPTimeoutException $e) {
        $connection->checkHeartBeat();
        continue;
    } catch (Exception $e) {
        file_put_contents('debug.txt', $e, FILE_APPEND);
        exit;
    }
}
//while ($connection->isConnected()) {
//    $channel->wait(null, false, 10);
//}

//$channel->close();
//$connection->close();
